##########################################################################
# Copyright (c) 2010-2022 Robert Bosch GmbH
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0.
#
# SPDX-License-Identifier: EPL-2.0
##########################################################################

"""
SocketCan to trc file logger
****************************

:module: socketcan_to_trc

:synopsis: Log SocketCAN messages into peak can trc file format.

.. currentmodule:: socketcan_to_trc

"""

import logging
import subprocess
import sys
import time
from textwrap import dedent

try:
    import can
except ImportError as e:
    raise ImportError(f"{e.name} dependency missing, consider installing pykiso with 'pip install pykiso[can]'")


log = logging.getLogger(__name__)


TRC_HEADER = """\
        ;$FILEVERSION=2.0
        ;$STARTTIME={starttime_days:12.3f}
        ;$COLUMNS=N,O,T,I,d,L,D
        ;
        ;   {filename:s}
        ;   start time: {starttime_str:s}
        ;   Generated by socketcan_to_trc(python)
        ;-------------------------------------------------------------------------------
        ;   Connection                                Bit rate
        ;   {can_name:s}                                      {ip_link_show:s}
        ;-------------------------------------------------------------------------------
        ; Glossary:
        ;   Direction of Message:
        ;     Rx: The frame was received
        ;     Tx: The frame was transmitted
        ;
        ;   Type of message:
        ;     DT: CAN or J1939 data frame
        ;     FD: CAN FD data frame
        ;     FB: CAN FD data frame with BRS bit set (Bit Rate Switch)
        ;     FE: CAN FD data frame with ESI bit set (Error State Indicator)
        ;     BI: CAN FD data frame with both BRS and ESI bits set
        ;     RR: Remote Request Frame
        ;     ST: Hardware Status change
        ;     ER: Error Frame
        ;     EV: Event. User-defined text, begins directly after 2-digit type indicator
        ;-------------------------------------------------------------------------------
        ;   Message   Time    Type ID     Rx/Tx
        ;   Number    Offset  |    [hex]  |  Data Length Code
        ;   |         [ms]    |    |      |  |  Data [hex]
        ;   |         |       |    |      |  |  |
        ;---+-- ------+------ +- --+----- +- +- +- -- -- -- -- -- -- --
        """


class SocketCan2Trc(can.Listener):
    """Creates a logfile containing CAN-BUS messages in the PEAK TRC format.
    Currently the only difference is the RX/TX column which is always RX.
    """

    def __init__(self, can_name: str, trc_file_name: str):
        """Initialise the logger

        :param can_name: socket ip link name
        :param trc_file_name: filename or "-" for stdout
        """
        self.started = False
        self.num = 0
        self.trc_file_name = trc_file_name
        self.can_name = can_name
        self.trc_file = sys.stdout
        self.starttime = self.get_start_time()

    def stop(self):
        """cleanup logger"""
        if not self.started:
            return
        # Remove the listener else the Can Notifier stop function will call this stop function
        # which then call again the Can Notifier again etc, and python is in an infinite loop
        self.can_notifier.remove_listener(self)
        self.can_notifier.stop()
        self.can_notifier = None
        if self.trc_file != sys.stdout:
            self.trc_file.close()
        self.bus = None
        self.started = False

    def start(self):
        """start logging"""
        if self.started:
            return

        self.bus = can.ThreadSafeBus(self.can_name, bustype="socketcan", fd=True)
        self.can_notifier = can.Notifier(self.bus, [self], timeout=1.0, loop=None)

        self.open_trc_file()

        self.starttime = self.get_start_time()
        ip_link_show = self.get_ip_link_show()
        starttime_days = self.get_start_time_days()

        self.log_trc_header(starttime_days, "Today", ip_link_show)
        self.started = True

    def on_message_received(self, msg: can.Message = None):
        """Increment message counter and log message.

        Override the can.Notifier.on_message_received.

        :param msg: message to log
        """
        self.log_can_frame(self.num, msg)
        self.num += 1

    @staticmethod
    def get_start_time():
        """return the system start time in float to have a offset of the time column

        :return: start time as an offset
        """
        return time.time()

    @staticmethod
    def get_start_time_days():
        """:return:  the days in double since 30.12.1899"""
        return time.time() / 60 / 60 / 24 + 25569

    def log_trc_header(self, starttime_days: float, starttime_str: str, ip_link_show: str):
        """Write the header of the TRC file

        :param starttime_days: number of days since 30.12.1899
        :param starttime_str: starttime string ( currently not used by parsers )
        :param ip_link_show: output of "ip -d link show" as an alternative to the pean information
        """
        print(
            dedent(TRC_HEADER).format(
                starttime_days=starttime_days,
                filename=self.trc_file_name,
                starttime_str=starttime_str,
                can_name=self.can_name,
                ip_link_show=ip_link_show,
            ),
            file=self.trc_file,
            flush=False,
        )

    def open_trc_file(self):
        """If trc_file_name is not stdout, open file trc-file"""

        # with not useful here
        # pylint: disable=R1732
        if self.trc_file_name != "-":
            self.trc_file = open(self.trc_file_name, mode="w", encoding="utf-8")

    @staticmethod
    def get_type(can_frame: can.Message):
        """return the TRC specific type of CAN-Frame

        :return: type string in TRC file
        """
        ret = ""
        if can_frame.is_remote_frame:
            ret = "RR"
        elif can_frame.is_error_frame:
            ret = "ER"
        elif not can_frame.is_fd:
            ret = "DT"
        elif can_frame.bitrate_switch and can_frame.error_state_indicator:
            ret = "BI"
        elif can_frame.bitrate_switch:
            ret = "FB"
        elif can_frame.error_state_indicator:
            ret = "FE"
        else:
            ret = "FD"

        return ret

    def log_can_frame(self, num: int, can_frame: can.Message):
        """Write a row to TRC file

        :param num: message number
        :param can_frame: can frame to log
        """
        if can_frame.is_error_frame:
            log.internal_warning("is errorframe")
        txt = "{num:7d} {time_msec:13.3f} {type:<6} {can_id:04x} RX {len:<2d}"
        print(
            txt.format(
                num=num,
                time_msec=(can_frame.timestamp - self.starttime) * 1000,
                type=self.get_type(can_frame),
                can_id=can_frame.arbitration_id,
                len=can_frame.dlc,
            ),
            file=self.trc_file,
            flush=False,
            end="",
        )

        if not can_frame.is_remote_frame:
            # f-string doesn't have formatting -> disable
            # pylint: disable=C0209
            for i in range(can_frame.dlc):
                print(
                    " {data:02X}".format(data=can_frame.data[i]),
                    file=self.trc_file,
                    end="",
                    flush=False,
                )

        print("", file=self.trc_file, flush=True)

    def get_ip_link_show(self):
        ''':return: the formatted output of "ip -d link show"'''
        complete_process = subprocess.run(
            ["ip", "-d", "link", "show", self.can_name],
            capture_output=True,
            check=True,
        )
        if complete_process.returncode != 0:
            raise RuntimeError("ip -d link show failed: " + complete_process.stdout.decode("ascii"))
        txt = complete_process.stdout.decode("ascii")
        lines = txt.splitlines(keepends=True)
        for i in range(1, len(lines), 1):
            lines[i] = ";                                         " + str(lines[i])
        return "".join(lines)
